
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;


public class MysqlSource {

  public static void main(String[] args) throws Exception {

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    //读mysql
    DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
            //    .指定驱动名称
            .setDrivername("com.mysql.jdbc.Driver")
            //      url
            .setDBUrl("jdbc:mysql://Desktop:3306/employees?useUnicode=true&characterEncoding=utf-8&useSSL=false")
            .setUsername("appleyuchi")
            .setPassword("appleyuchi")
            .setQuery("select * from titles")
            .setRowTypeInfo(new RowTypeInfo(
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.DATE_TYPE_INFO,
                    BasicTypeInfo.DATE_TYPE_INFO))
            .finish()
    );

//获取数据并打印
    dataSource.map(new MapFunction<Row, String>() {
      @Override
      public String map(Row value) throws Exception {
        System.out.println(value);
        return value.toString();
      }
    }).print();






  }
}




